import contextlib
import json
from unittest.mock import patch

from odoo import fields
from odoo.addons.base.tests.common import SavepointCaseWithUserDemo
from odoo.tests import common
from odoo.tools.misc import file_open, mute_logger
from odoo.tools.translate import code_translations


def message(msg, type_='error', from_=0, to_=0, record=0, field='value', **kwargs):
    return dict(kwargs, type=type_, rows={'from': from_, 'to': to_}, record=record, field=field, message=msg)


def moreaction(**kwargs):
    return dict(
        kwargs,
        type='ir.actions.act_window',
        target='new',
        context={'create': False},
        name='Possible Values',
        view_mode='list,form',
        views=[(False, 'list'), (False, 'form')],
        help="See all possible values",
    )


def values(seq, field='value'):
    return [item[field] for item in seq]


class ImporterCase(common.TransactionCase):
    model_name = False

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.model = None

    def setUp(self):
        super().setUp()
        self.model = self.env[self.model_name]
        self.env.registry.clear_cache()
        self.cr.cache.clear()

    def import_(self, fields, rows, context=None):
        context = context or {}
        context.update(
            {
                'import_file': True,
            }
        )
        return self.model.with_context(context or {}).load(fields, rows)

    def read(self, fields=('value',), domain=(), context=None):
        records = self.model.with_context(context or {}).search(domain)
        return records.read(fields)

    def browse(self, domain=(), context=None):
        return self.model.with_context(context or {}).search(domain)

    def xid(self, record):
        ModelData = self.env['ir.model.data']

        data = ModelData.search([('model', '=', record._name), ('res_id', '=', record.id)])
        if data:
            d = data.read(['name', 'module'])[0]
            if d['module']:
                return '%s.%s' % (d['module'], d['name'])
            return d['name']

        name = record.display_name
        # fix dotted display_name results, otherwise xid lookups blow up
        name = name.replace('.', '-')
        ModelData.create(
            {
                'name': name,
                'model': record._name,
                'res_id': record.id,
                'module': '__test__',
            }
        )
        return '__test__.' + name


class test_ids_stuff(ImporterCase):
    model_name = 'export.integer'

    def test_create_with_id(self):
        result = self.import_(['.id', 'value'], [['42', '36']])
        self.assertIs(result['ids'], False)
        self.assertEqual(
            result['messages'],
            [
                {
                    'type': 'error',
                    'rows': {'from': 0, 'to': 0},
                    'record': 0,
                    'field': '.id',
                    'message': "Unknown database identifier '42'",
                }
            ],
        )

    def test_create_with_xid(self):
        result = self.import_(['id', 'value'], [['somexmlid', '42']])
        self.assertEqual(len(result['ids']), 1)
        self.assertFalse(result['messages'])
        self.assertEqual('__import__.somexmlid', self.xid(self.browse()[0]))

    def test_update_with_id(self):
        record = self.model.create({'value': 36})
        self.assertEqual(36, record.value)

        result = self.import_(['.id', 'value'], [[str(record.id), '42']])
        self.assertEqual(len(result['ids']), 1)
        self.assertFalse(result['messages'])
        self.assertEqual(
            [42],  # updated value to imported
            values(self.read()),
        )

    def test_update_with_xid(self):
        self.import_(['id', 'value'], [['somexmlid', '36']])
        self.assertEqual([36], values(self.read()))

        self.import_(['id', 'value'], [['somexmlid', '1234567']])
        self.assertEqual([1234567], values(self.read()))


class test_boolean_field(ImporterCase):
    model_name = 'export.boolean'

    def test_empty(self):
        self.assertEqual(self.import_(['value'], []), {'ids': [], 'messages': [], 'nextrow': False})

    def test_exported(self):
        result = self.import_(['value'], [['False'], ['True']])
        self.assertEqual(len(result['ids']), 2)
        self.assertFalse(result['messages'])
        records = self.read()
        self.assertEqual(
            [
                False,
                True,
            ],
            values(records),
        )

    def test_falses(self):
        for lang in ['fr_FR', 'de_DE', 'ru_RU', 'nl_BE', 'lt_LT']:
            self.env['res.lang']._activate_lang(lang)
        falses = [
            ['0'],
            ['no'],
            ['false'],
            ['FALSE'],
            [''],
            ['faux'],  # false, fr
            ['falsch'],  # false, de
            ['ложь'],  # no, ru
            ['onwaar'],  # false, nl
            ['ne'],  # false, lt,
        ]

        result = self.import_(['value'], falses)
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), len(falses))
        self.assertEqual([False] * len(falses), values(self.read()))

    def test_trues(self):
        # Since importing wrong boolean values is now returning error, import should not return any ids if an error is raised.
        trues = [
            ['None'],
            ['nil'],
            ['()'],
            ['f'],
            ['#f'],
            # Problem: OpenOffice (and probably excel) output localized booleans
            ['VRAI'],
            ['ok'],
            ['true'],
            ['yes'],
            ['1'],
        ]
        result = self.import_(['value'], trues)
        self.assertEqual(result['ids'], False)
        self.assertEqual(
            result['messages'],
            [
                {
                    'rows': {'from': i, 'to': i},
                    'type': 'error',
                    'record': i,
                    'field': 'value',
                    'message': "Unknown value '%s' for boolean field 'Value'" % v[0],
                    'moreinfo': "Use '1' for yes and '0' for no",
                    'field_name': 'Value',
                }
                for i, v in enumerate(trues)
                if v[0] not in ('true', 'yes', '1')
            ],
        )

        # Only correct boolean values are accepted.
        result = self.import_(['value'], [['1'], ['yes'], ['true']])
        self.assertEqual(len(result['ids']), 3)
        self.assertEqual([True] * 3, values(self.read()))


class test_integer_field(ImporterCase):
    model_name = 'export.integer'

    def test_none(self):
        self.assertEqual(self.import_(['value'], []), {'ids': [], 'messages': [], 'nextrow': False})

    def test_empty(self):
        result = self.import_(['value'], [['']])
        self.assertEqual(len(result['ids']), 1)
        self.assertFalse(result['messages'])
        self.assertEqual([False], values(self.read()))

    def test_zero(self):
        result = self.import_(['value'], [['0']])
        self.assertEqual(len(result['ids']), 1)
        self.assertFalse(result['messages'])

        result = self.import_(['value'], [['-0']])
        self.assertEqual(len(result['ids']), 1)
        self.assertFalse(result['messages'])

        self.assertEqual([False, False], values(self.read()))

    def test_positives(self):
        result = self.import_(
            ['value'],
            [
                ['1'],
                ['42'],
                [str(2**31 - 1)],
                ['12345678'],
            ],
        )
        self.assertEqual(len(result['ids']), 4)
        self.assertFalse(result['messages'])

        self.assertEqual(
            [
                1,
                42,
                2**31 - 1,
                12345678,
            ],
            values(self.read()),
        )

    def test_negatives(self):
        result = self.import_(
            ['value'],
            [
                ['-1'],
                ['-42'],
                [str(-(2**31 - 1))],
                [str(-(2**31))],
                ['-12345678'],
            ],
        )
        self.assertEqual(len(result['ids']), 5)
        self.assertFalse(result['messages'])
        self.assertEqual(
            [
                -1,
                -42,
                -(2**31 - 1),
                -(2**31),
                -12345678,
            ],
            values(self.read()),
        )

    @mute_logger('odoo.sql_db', 'odoo.models')
    def test_out_of_range(self):
        result = self.import_(['value'], [[str(2**31)]])
        self.assertIs(result['ids'], False)
        self.assertEqual(
            result['messages'],
            [
                {
                    'type': 'error',
                    'rows': {'from': 0, 'to': 0},
                    'record': 0,
                    'message': "integer out of range\n",
                }
            ],
        )

        result = self.import_(['value'], [[str(-(2**32))]])
        self.assertIs(result['ids'], False)
        self.assertEqual(
            result['messages'],
            [
                {
                    'type': 'error',
                    'rows': {'from': 0, 'to': 0},
                    'record': 0,
                    'message': "integer out of range\n",
                }
            ],
        )

    def test_nonsense(self):
        result = self.import_(['value'], [['zorglub']])
        self.assertIs(result['ids'], False)
        self.assertEqual(
            result['messages'],
            [
                {
                    'field_name': 'Value',
                    'type': 'error',
                    'rows': {'from': 0, 'to': 0},
                    'record': 0,
                    'field': 'value',
                    'message': "'zorglub' does not seem to be an integer for field 'Value'",
                }
            ],
        )


class test_float_field(ImporterCase):
    model_name = 'export.float'

    def test_none(self):
        self.assertEqual(self.import_(['value'], []), {'ids': [], 'messages': [], 'nextrow': False})

    def test_empty(self):
        result = self.import_(['value'], [['']])
        self.assertEqual(len(result['ids']), 1)
        self.assertFalse(result['messages'])
        self.assertEqual([False], values(self.read()))

    def test_zero(self):
        result = self.import_(['value'], [['0']])
        self.assertEqual(len(result['ids']), 1)
        self.assertFalse(result['messages'])

        result = self.import_(['value'], [['-0']])
        self.assertEqual(len(result['ids']), 1)
        self.assertFalse(result['messages'])

        self.assertEqual([False, False], values(self.read()))

    def test_positives(self):
        result = self.import_(
            ['value'],
            [
                ['1'],
                ['42'],
                [str(2**31 - 1)],
                ['12345678'],
                [str(2**33)],
                ['0.000001'],
            ],
        )
        self.assertEqual(len(result['ids']), 6)
        self.assertFalse(result['messages'])

        self.assertEqual(
            [
                1,
                42,
                2**31 - 1,
                12345678,
                2.0**33,
                0.000001,
            ],
            values(self.read()),
        )

    def test_negatives(self):
        result = self.import_(
            ['value'],
            [
                ['-1'],
                ['-42'],
                [str(-(2**31) + 1)],
                [str(-(2**31))],
                ['-12345678'],
                [str(-(2**33))],
                ['-0.000001'],
            ],
        )
        self.assertEqual(len(result['ids']), 7)
        self.assertFalse(result['messages'])
        self.assertEqual(
            [
                -1,
                -42,
                -(2**31 - 1),
                -(2**31),
                -12345678,
                -(2.0**33),
                -0.000001,
            ],
            values(self.read()),
        )

    def test_nonsense(self):
        result = self.import_(['value'], [['foobar']])
        self.assertIs(result['ids'], False)
        self.assertEqual(result['messages'], [message("'foobar' does not seem to be a number for field 'Value'", field_name='Value')])


class test_string_field(ImporterCase):
    model_name = 'export.string.bounded'

    def test_empty(self):
        result = self.import_(['value'], [['']])
        self.assertEqual(len(result['ids']), 1)
        self.assertFalse(result['messages'])
        self.assertEqual([False], values(self.read()))

    def test_imported(self):
        result = self.import_(
            ['value'],
            [
                ['foobar'],
                ['foobarbaz'],
                ['Með suð í eyrum við spilum endalaust'],
                ["People 'get' types. They use them all the time. Telling someone he can't pound a nail with a banana doesn't much surprise him."],
            ],
        )
        self.assertEqual(len(result['ids']), 4)
        self.assertFalse(result['messages'])
        self.assertEqual(
            [
                "foobar",
                "foobarbaz",
                "Með suð í eyrum ",
                "People 'get' typ",
            ],
            values(self.read()),
        )


class test_unbound_string_field(ImporterCase):
    model_name = 'export.string'

    def test_imported(self):
        result = self.import_(
            ['value'],
            [
                ['í dag viðrar vel til loftárása'],
                # ackbar.jpg
                ["If they ask you about fun, you tell them – fun is a filthy parasite"],
            ],
        )
        self.assertEqual(len(result['ids']), 2)
        self.assertFalse(result['messages'])
        self.assertEqual(
            [
                "í dag viðrar vel til loftárása",
                "If they ask you about fun, you tell them – fun is a filthy parasite",
            ],
            values(self.read()),
        )


class test_required_string_field(ImporterCase):
    model_name = 'export.string.required'

    @mute_logger('odoo.sql_db', 'odoo.models')
    def test_empty(self):
        result = self.import_(['value'], [[]])
        self.assertEqual(len(result['messages']), 1)
        result_message = result['messages'][0]
        expected_message = message("Missing required value for the field 'Value' (value)")
        self.assertIn(expected_message.pop('message'), result_message.pop('message'))
        self.assertEqual(result_message, expected_message)
        self.assertIs(result['ids'], False)

    @mute_logger('odoo.sql_db', 'odoo.models')
    def test_not_provided(self):
        result = self.import_(['const'], [['12']])
        self.assertEqual(len(result['messages']), 1)
        result_message = result['messages'][0]
        expected_message = message("Missing required value for the field 'Value' (value)")
        self.assertIn(expected_message.pop('message'), result_message.pop('message'))
        self.assertEqual(result_message, expected_message)
        self.assertIs(result['ids'], False)

    @mute_logger('odoo.sql_db', 'odoo.models')
    def test_ignore_excess_messages(self):
        result = self.import_(['const'], [[str(n)] for n in range(100)])
        self.assertIs(result['ids'], False)
        self.assertEqual(len(result['messages']), 11)
        for m in result['messages'][:-1]:
            self.assertEqual(m['type'], 'error')
            self.assertIn("Missing required value for the field 'Value' (value)", m['message'])
        last = result['messages'][-1]
        self.assertEqual(last['type'], 'warning')
        self.assertEqual(
            last['message'],
            "Found more than 10 errors and more than one error per 10 records, interrupted to avoid showing too many errors.",
        )


class test_text(ImporterCase):
    model_name = 'export.text'

    def test_empty(self):
        result = self.import_(['value'], [['']])
        self.assertEqual(len(result['ids']), 1)
        self.assertFalse(result['messages'])
        self.assertEqual([False], values(self.read()))

    def test_imported(self):
        s = (
            "Breiðskífa er notað um útgefna hljómplötu sem inniheldur "
            "stúdíóupptökur frá einum flytjanda. Breiðskífur eru oftast "
            "milli 25-80 mínútur og er lengd þeirra oft miðuð við 33⅓ "
            "snúninga 12 tommu vínylplötur (sem geta verið allt að 30 mín "
            "hvor hlið).\n\nBreiðskífur eru stundum tvöfaldar og eru þær þá"
            " gefnar út á tveimur geisladiskum eða tveimur vínylplötum."
        )
        result = self.import_(['value'], [[s]])
        self.assertEqual(len(result['ids']), 1)
        self.assertFalse(result['messages'])
        self.assertEqual([s], values(self.read()))


class test_selection(ImporterCase):
    model_name = 'export.selection'
    translations_fr = {
        "Foo": "tete",
        "Bar": "titi",
        "Qux": "toto",
    }

    def test_imported(self):
        result = self.import_(
            ['value'],
            [
                ['Qux'],
                ['Bar'],
                ['Foo'],
                ['2'],
            ],
        )
        self.assertEqual(len(result['ids']), 4)
        self.assertFalse(result['messages'])
        self.assertEqual(['3', '2', '1', '2'], values(self.read()))

    def test_imported_translated(self):
        self.env['res.lang']._activate_lang('fr_FR')
        # add translations for selection
        selections = self.env['ir.model.fields.selection'].search([('name', 'in', list(self.translations_fr))])
        for selection in selections:
            selection.with_context(lang='fr_FR').name = self.translations_fr[selection.name]

        result = self.import_(
            ['value'],
            [
                ['toto'],
                ['tete'],
                ['titi'],
            ],
            context={'lang': 'fr_FR'},
        )
        self.assertEqual(len(result['ids']), 3)
        self.assertFalse(result['messages'])

        self.assertEqual(['3', '1', '2'], values(self.read()))

        result = self.import_(['value'], [['Foo']], context={'lang': 'fr_FR'})
        self.assertEqual(len(result['ids']), 1)
        self.assertFalse(result['messages'])

    def test_invalid(self):
        result = self.import_(['value'], [['Baz']])
        self.assertIs(result['ids'], False)
        self.assertEqual(result['messages'], [message("Value 'Baz' not found in selection field 'Value'", moreinfo="Foo Bar Qux 4".split(), field_name='Value', field_path=['value'])])

        result = self.import_(['value'], [['42']])
        self.assertIs(result['ids'], False)
        self.assertEqual(result['messages'], [message("Value '42' not found in selection field 'Value'", moreinfo="Foo Bar Qux 4".split(), field_name='Value', field_path=['value'])])


class test_selection_with_default(ImporterCase):
    model_name = 'export.selection.withdefault'

    def test_empty(self):
        """Empty cells should set corresponding field to False"""
        result = self.import_(['value'], [['']])
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 1)

        self.assertEqual(values(self.read()), [False])

    def test_default(self):
        """Non-provided cells should set corresponding field to default"""
        result = self.import_(['const'], [['42']])
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 1)

        self.assertEqual(values(self.read()), ['2'])


class test_selection_function(ImporterCase):
    model_name = 'export.selection.function'
    translations_fr = {
        "Corge": "toto",
        "Grault": "titi",
        "Wheee": "tete",
        "Moog": "tutu",
    }

    def test_imported(self):
        """import uses fields_get, so translates import label (may or may not
        be good news) *and* serializes the selection function to reverse it:
        import does not actually know that the selection field uses a function
        """
        # NOTE: conflict between a value and a label => pick first
        result = self.import_(
            ['value'],
            [
                ['3'],
                ["Grault"],
            ],
        )
        self.assertEqual(len(result['ids']), 2)
        self.assertFalse(result['messages'])
        self.assertEqual(values(self.read()), ['3', '1'])

    def test_translated(self):
        """Expects output of selection function returns translated labels"""
        self.env['res.lang']._activate_lang('fr_FR')
        # hack into code translations
        code_translations.python_translations[('test_import_export', 'fr_FR')] = self.translations_fr
        self.addCleanup(code_translations.python_translations.pop, ('test_import_export', 'fr_FR'))

        result = self.import_(
            ['value'],
            [
                ['titi'],
                ['tete'],
            ],
            context={'lang': 'fr_FR'},
        )
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 2)
        self.assertEqual(values(self.read()), ['1', '2'])

        result = self.import_(['value'], [['Wheee']], context={'lang': 'fr_FR'})
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 1)


class test_m2o(ImporterCase):
    model_name = 'export.many2one'

    def test_by_name(self):
        # create integer objects
        record1 = self.env['export.integer'].create({'value': 42})
        record2 = self.env['export.integer'].create({'value': 36})

        # preheat the oven
        for _ in range(5):
            with contextlib.closing(self.env.cr.savepoint(flush=False)):
                self.import_(['value'], [[record1.display_name], [record1.display_name], [record2.display_name]])

        # 1 x SAVEPOINT load
        # 3 x name_search
        # 1 x SAVEPOINT _load_records
        # 1 x select on list of existing modules
        # 1 x insert
        # 1 x RELEASE SAVEPOINT _load_records
        # 1 x RELEASE SAVEPOINT load
        # => 9
        with self.assertQueryCount(9):
            result = self.import_(
                ['value'],
                [
                    # import by display_name
                    [record1.display_name],
                    [record1.display_name],
                    [record2.display_name],
                ],
            )

        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 3)
        # correct ids assigned to corresponding records
        self.assertEqual([(record1.id, record1.display_name), (record1.id, record1.display_name), (record2.id, record2.display_name)], values(self.read()))

    def test_by_xid(self):
        record = self.env['export.integer'].create({'value': 42})
        xid = self.xid(record)

        result = self.import_(['value/id'], [[xid]])
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 1)
        b = self.browse()
        self.assertEqual(42, b[0].value.value)

    def test_by_id(self):
        record = self.env['export.integer'].create({'value': 42})
        result = self.import_(['value/.id'], [[record.id]])
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 1)
        b = self.browse()
        self.assertEqual(42, b[0].value.value)

    def test_by_names(self):
        record1 = self.env['export.integer'].create({'value': 42})
        record2 = self.env['export.integer'].create({'value': 42})
        # names should be the same
        self.assertEqual(record1.display_name, record2.display_name)

        result = self.import_(['value'], [[record2.display_name]])
        self.assertEqual(result['messages'], [message('Found multiple matches for value "export.integer:42" in field "Value" (2 matches)', type_='warning')])
        self.assertEqual(len(result['ids']), 1)
        self.assertEqual(
            [
                (record1.id, record1.display_name),
            ],
            values(self.read()),
        )

    def test_fail_by_implicit_id(self):
        """Can't implicitly import records by id"""
        # create integer objects
        record1 = self.env['export.integer'].create({'value': 42})
        record2 = self.env['export.integer'].create({'value': 36})

        # Because name_search all the things. Fallback schmallback
        result = self.import_(
            ['value'],
            [
                # import by id, without specifying it
                [record1.id],
                [record2.id],
                [record1.id],
            ],
        )
        self.assertEqual(
            result['messages'],
            [
                message(
                    f"No matching record found for name '{id_}' in field 'Value'",
                    from_=index,
                    to_=index,
                    record=index,
                    moreinfo=moreaction(res_model='export.integer'),
                    field_name='Value',
                    field_path=['value'],
                    field_type='name',
                    value=id_,
                )
                for index, id_ in enumerate([record1.id, record2.id, record1.id])
            ],
        )
        self.assertIs(result['ids'], False)

    @mute_logger('odoo.sql_db')
    def test_fail_id_mistype(self):
        result = self.import_(['value/.id'], [["foo"]])

        self.assertEqual(
            result['messages'],
            [
                message(
                    "Invalid database id 'foo' for the field 'Value'",
                    moreinfo=moreaction(res_model='ir.model.data', domain=[('model', '=', 'export.integer')]),
                    field_name='Value',
                    field_path=['value', '.id'],
                ),
            ],
        )
        self.assertIs(result['ids'], False)

    def test_sub_field(self):
        """Does not implicitly create the record, does not warn that you can't
        import m2o subfields (at all)...
        """
        result = self.import_(['value/value'], [['42']])
        self.assertEqual(result['messages'], [message("Can not create Many-To-One records indirectly, import the field separately")])
        self.assertIs(result['ids'], False)

    def test_fail_noids(self):
        result = self.import_(['value'], [['nameisnoexist:3']])
        self.assertEqual(
            result['messages'],
            [
                message(
                    "No matching record found for name 'nameisnoexist:3' in field 'Value'",
                    moreinfo=moreaction(res_model='export.integer'),
                    field_name='Value',
                    field_path=['value'],
                    field_type='name',
                    value='nameisnoexist:3',
                )
            ],
        )
        self.assertIs(result['ids'], False)

        result = self.import_(['value/id'], [['noxidhere']])
        self.assertEqual(
            result['messages'],
            [
                message(
                    "No matching record found for external id 'noxidhere' in field 'Value'",
                    moreinfo=moreaction(res_model='ir.model.data', domain=[('model', '=', 'export.integer')]),
                    field_name='Value',
                    field_path=['value', 'id'],
                    field_type="external id",
                    value="noxidhere",
                )
            ],
        )
        self.assertIs(result['ids'], False)

        result = self.import_(['value/.id'], [['66']])
        self.assertEqual(
            result['messages'],
            [
                message(
                    "No matching record found for database id '66' in field 'Value'",
                    moreinfo=moreaction(res_model='ir.model.data', domain=[('model', '=', 'export.integer')]),
                    field_name='Value',
                    field_path=['value', '.id'],
                    field_type="database id",
                    value="66",
                )
            ],
        )
        self.assertIs(result['ids'], False)

    def test_fail_multiple(self):
        result = self.import_(['value', 'value/id'], [['somename', 'somexid']])
        self.assertEqual(result['messages'], [message("Ambiguous specification for field 'Value', only provide one of name, external id or database id")])
        self.assertIs(result['ids'], False)

    def test_fail_id(self):
        record = self.env['export.integer'].create({'value': 42})
        result = self.import_(['value/.id'], [["lorem"], [record.id]])
        self.assertFalse(result['ids'])
        self.assertTrue(result['messages'])
        self.assertEqual(result['messages'][0]['message'], "Invalid database id 'lorem' for the field 'Value'")

    def test_name_create_enabled_m2o(self):
        result = self.import_(['value'], [[101]])
        self.assertEqual(
            result['messages'],
            [
                message(
                    "No matching record found for name '101' in field 'Value'",
                    moreinfo=moreaction(res_model='export.integer'),
                    field_name='Value',
                    field_path=['value'],
                    field_type='name',
                    value=101,
                )
            ],
        )
        self.assertIs(result['ids'], False)
        context = {
            'name_create_enabled_fields': {'value': True},
        }
        result = self.import_(['value'], [[101]], context=context)
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 1)

    @mute_logger('odoo.sql_db')
    def test_name_create_enabled_m2o_required_field(self):
        self.model = self.env['export.many2one.required.subfield']
        self.env['export.with.required.field'].create({'name': 'ipsum', 'value': 10})
        context = {'name_create_enabled_fields': {'name': True}}
        result = self.import_(['name'], [['lorem'], ['ipsum']], context=context)
        messages = result['messages']
        self.assertTrue(messages)
        self.assertEqual(len(messages), 1)
        self.assertEqual(
            messages[0]['message'],
            "No matching record found for name 'lorem' in field 'Name' and the following error was "
            "encountered when we attempted to create one: Cannot create new 'export.with.required.field' "
            "records from their name alone. Please create those records manually and try importing again.",
        )


class TestInvalidStrings(ImporterCase):
    model_name = 'export.m2o.str'

    @mute_logger('odoo.sql_db')
    def test_fail_unpaired_surrogate(self):
        result = self.import_(['child_id'], [['\uddff']])
        self.assertTrue(result['messages'])
        self.assertIn('surrogates', result['messages'][0]['message'])

    @mute_logger('odoo.sql_db')
    def test_fail_nul(self):
        result = self.import_(['child_id'], [['\x00']])
        self.assertTrue(result['messages'])
        self.assertIn('NUL', result['messages'][0]['message'])


class test_m2m(ImporterCase):
    model_name = 'export.many2many'

    # apparently, one and only thing which works is a
    # csv_internal_sep-separated list of ids, xids, or names (depending if
    # m2m/.id, m2m/id or m2m[/anythingelse]
    def test_ids(self):
        id1 = self.env['export.many2many.other'].create({'value': 3, 'str': 'record0'}).id
        id2 = self.env['export.many2many.other'].create({'value': 44, 'str': 'record1'}).id
        id3 = self.env['export.many2many.other'].create({'value': 84, 'str': 'record2'}).id
        id4 = self.env['export.many2many.other'].create({'value': 9, 'str': 'record3'}).id
        id5 = self.env['export.many2many.other'].create({'value': 99, 'str': 'record4'}).id

        result = self.import_(
            ['value/.id'],
            [
                ['%d,%d' % (id1, id2)],
                ['%d,%d,%d' % (id1, id3, id4)],
                ['%d,%d,%d' % (id1, id2, id3)],
                ['%d' % id5],
            ],
        )
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 4)

        def ids(records):
            return [record.id for record in records]

        b = self.browse()
        self.assertEqual(ids(b[0].value), [id1, id2])
        self.assertEqual(values(b[0].value), [3, 44])

        self.assertEqual(ids(b[2].value), [id1, id2, id3])
        self.assertEqual(values(b[2].value), [3, 44, 84])

    def test_noids(self):
        result = self.import_(['value/.id'], [['42']])
        self.assertEqual(
            result['messages'],
            [
                message(
                    "No matching record found for database id '42' in field 'Value'",
                    moreinfo=moreaction(res_model='ir.model.data', domain=[('model', '=', 'export.many2many.other')]),
                    field_name='Value',
                    field_path=['value', '.id'],
                    field_type="database id",
                    value='42',
                )
            ],
        )
        self.assertIs(result['ids'], False)

    def test_xids(self):
        record0 = self.env['export.many2many.other'].create({'value': 3, 'str': 'record0'})
        record1 = self.env['export.many2many.other'].create({'value': 44, 'str': 'record1'})
        record2 = self.env['export.many2many.other'].create({'value': 84, 'str': 'record2'})
        record3 = self.env['export.many2many.other'].create({'value': 9, 'str': 'record3'})

        result = self.import_(
            ['value/id'],
            [
                ['%s,%s' % (self.xid(record0), self.xid(record1))],
                ['%s' % self.xid(record3)],
                ['%s,%s' % (self.xid(record2), self.xid(record1))],
            ],
        )
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 3)

        b = self.browse()
        self.assertCountEqual(values(b[0].value), [3, 44])
        self.assertCountEqual(values(b[2].value), [44, 84])

    def test_noxids(self):
        result = self.import_(['value/id'], [['noxidforthat']])
        self.assertEqual(
            result['messages'],
            [
                message(
                    "No matching record found for external id 'noxidforthat' in field 'Value'",
                    moreinfo=moreaction(res_model='ir.model.data', domain=[('model', '=', 'export.many2many.other')]),
                    field_name='Value',
                    field_path=['value', 'id'],
                    field_type='external id',
                    value='noxidforthat',
                )
            ],
        )
        self.assertIs(result['ids'], False)

    def test_names(self):
        record0 = self.env['export.many2many.other'].create({'value': 3, 'str': 'record0'})
        record1 = self.env['export.many2many.other'].create({'value': 44, 'str': 'record1'})
        record2 = self.env['export.many2many.other'].create({'value': 84, 'str': 'record2'})
        record3 = self.env['export.many2many.other'].create({'value': 9, 'str': 'record3'})

        def name(record):
            return record.display_name

        result = self.import_(
            ['value'],
            [
                ['%s,%s' % (name(record1), name(record2))],
                ['%s,%s,%s' % (name(record0), name(record1), name(record2))],
                ['%s,%s' % (name(record0), name(record3))],
            ],
        )
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 3)

        b = self.browse()
        self.assertEqual(values(b[1].value), [3, 44, 84])
        self.assertEqual(values(b[2].value), [3, 9])

    def test_nonames(self):
        result = self.import_(['value'], [['wherethem2mhavenonames']])
        self.assertEqual(
            result['messages'],
            [
                message(
                    "No matching record found for name 'wherethem2mhavenonames' in field 'Value'",
                    moreinfo=moreaction(res_model='export.many2many.other'),
                    field_name='Value',
                    field_path=['value'],
                    field_type="name",
                    value='wherethem2mhavenonames',
                )
            ],
        )
        self.assertIs(result['ids'], False)

    def test_import_to_existing(self):
        id1 = self.env['export.many2many.other'].create({'value': 3, 'str': 'record0'}).id
        id2 = self.env['export.many2many.other'].create({'value': 44, 'str': 'record1'}).id
        id3 = self.env['export.many2many.other'].create({'value': 84, 'str': 'record2'}).id
        id4 = self.env['export.many2many.other'].create({'value': 9, 'str': 'record3'}).id

        xid = 'myxid'
        result = self.import_(['id', 'value/.id'], [[xid, '%d,%d' % (id1, id2)]])
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 1)
        result = self.import_(['id', 'value/.id'], [[xid, '%d,%d' % (id3, id4)]])
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 1)

        b = self.browse()
        self.assertEqual(len(b), 1)
        # TODO: replacement of existing m2m values is correct?
        self.assertEqual(values(b[0].value), [84, 9])


class test_o2m(ImporterCase):
    model_name = 'export.one2many'

    def test_display_name(self):
        s = 'Java is a DSL for taking large XML files and converting them to stack traces'
        result = self.import_(['const', 'value'], [['5', s]])
        self.assertEqual(
            result['messages'],
            [
                message(
                    "No matching record found for name '%s' in field 'Value'" % s[:50],
                    moreinfo=moreaction(res_model='export.one2many.child'),
                    field_name='Value',
                    field_path=['value'],
                    field_type='name',
                    value=s[:50],
                )
            ],
        )
        self.assertIs(result['ids'], False)

    def test_single(self):
        result = self.import_(
            ['const', 'value/value'],
            [
                ['5', '63'],
            ],
        )
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 1)

        (b,) = self.browse()
        self.assertEqual(b.const, 5)
        self.assertEqual(values(b.value), [63])

    def test_multicore(self):
        result = self.import_(
            ['const', 'value/value'],
            [
                ['5', '63'],
                ['6', '64'],
            ],
        )
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 2)

        b1, b2 = self.browse()
        self.assertEqual(b1.const, 5)
        self.assertEqual(values(b1.value), [63])
        self.assertEqual(b2.const, 6)
        self.assertEqual(values(b2.value), [64])

    def test_multisub(self):
        result = self.import_(
            ['const', 'value/value'],
            [
                ['5', '63'],
                ['', '64'],
                ['', '65'],
                ['', '66'],
            ],
        )
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 1)

        (b,) = self.browse()
        self.assertEqual(set(values(b.value)), {63, 64, 65, 66})

    def test_multi_subfields(self):
        result = self.import_(
            ['value/str', 'const', 'value/value'],
            [
                ['this', '5', '63'],
                ['is', '', '64'],
                ['the', '', '65'],
                ['rhythm', '', '66'],
            ],
        )
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 1)

        (b,) = self.browse()
        self.assertEqual(set(values(b.value.sorted())), {63, 64, 65, 66})
        self.assertEqual(values(b.value.sorted(), 'str'), 'this is the rhythm'.split())

    def test_subfields_fail_by_implicit_id(self):
        result = self.import_(['value/parent_id'], [['noxidforthat']])
        self.assertEqual(
            result['messages'],
            [
                message(
                    "No matching record found for name 'noxidforthat' in field 'Value/Parent'",
                    moreinfo=moreaction(res_model='export.one2many'),
                    field_name='Value',
                    field_path=['value', 'parent_id'],
                    field_type='name',
                    value='noxidforthat',
                )
            ],
        )
        self.assertIs(result['ids'], False)

    def test_link_inline(self):
        """m2m-style specification for o2ms"""
        id1 = self.env['export.one2many.child'].create({'str': 'Bf', 'value': 109}).id
        id2 = self.env['export.one2many.child'].create({'str': 'Me', 'value': 262}).id

        result = self.import_(
            ['const', 'value/.id'],
            [
                ['42', '%d,%d' % (id1, id2)],
            ],
        )
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 1)

        [b] = self.browse()
        self.assertEqual(b.const, 42)
        # automatically forces link between core record and o2ms
        self.assertEqual(set(values(b.value)), {109, 262})
        self.assertEqual(values(b.value, field='parent_id'), [b, b])

    def test_link(self):
        """O2M relating to an existing record (update) force a LINK_TO as well"""
        id1 = self.env['export.one2many.child'].create({'str': 'Bf', 'value': 109}).id
        id2 = self.env['export.one2many.child'].create({'str': 'Me', 'value': 262}).id

        result = self.import_(
            ['const', 'value/.id'],
            [
                ['42', str(id1)],
                ['', str(id2)],
            ],
        )
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 1)

        [b] = self.browse()
        self.assertEqual(b.const, 42)
        # automatically forces link between core record and o2ms
        self.assertCountEqual(values(b.value), [109, 262])
        self.assertEqual(values(b.value, field='parent_id'), [b, b])

    def test_link_2(self):
        id1 = self.env['export.one2many.child'].create({'str': 'Bf', 'value': 109}).id
        id2 = self.env['export.one2many.child'].create({'str': 'Me', 'value': 262}).id

        result = self.import_(
            ['const', 'value/.id', 'value/value'],
            [
                ['42', str(id1), '1'],
                ['', str(id2), '2'],
            ],
        )
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 1)

        [b] = self.browse()
        self.assertEqual(b.const, 42)
        self.assertEqual(set(values(b.value)), {1, 2})
        self.assertEqual(values(b.value, field='parent_id'), [b, b])

    def test_o2m_repeated_with_xids(self):
        # concern: formerly this would link existing records, and fault if
        # the records did not exist. This is more in line with other XID uses,
        # however it does make thing work where they'd previously fail for
        # well-defined reasons.
        result = self.import_(
            ['id', 'const', 'value/id', 'value/value'],
            [
                ['a', '5', 'aa', '11'],
                ['', '', 'ab', '12'],
                ['', '', 'ac', '13'],
                ['', '', 'ad', '14'],
                ['b', '10', 'ba', '15'],
                ['', '', 'bb', '16'],
            ],
        )
        self.assertFalse(result['messages'])
        result = self.import_(
            ['id', 'const', 'value/id', 'value/value'],
            [
                ['a', '5', 'aa', '11'],
                ['', '', 'ab', '12'],
                ['', '', 'ac', '13'],
                ['', '', 'ad', '14'],
                ['b', '8', 'ba', '25'],
                ['', '', 'bb', '16'],
            ],
        )
        self.assertFalse(result['messages'])

        [a, b] = self.browse().sorted(lambda r: r.const)
        self.assertEqual(len(a.value), 4)
        self.assertEqual(len(b.value), 2)
        self.assertEqual(b.const, 8)
        self.assertEqual(b.value.mapped('value'), [25, 16])

    def test_name_create_enabled_m2o_in_o2m(self):
        result = self.import_(['value/m2o'], [[101]])
        self.assertEqual(
            result['messages'],
            [
                message(
                    "No matching record found for name '101' in field 'Value/M2O'",
                    moreinfo=moreaction(res_model='export.integer'),
                    field_name='Value',
                    field_path=['value', 'm2o'],
                    field_type='name',
                    value=101,
                )
            ],
        )
        self.assertEqual(result['ids'], False)
        context = {
            'name_create_enabled_fields': {'value/m2o': True},
        }
        result = self.import_(['value/m2o'], [[101]], context=context)
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 1)
        [b] = self.browse()
        self.assertEqual(b.value.m2o.value, 101)

    def test_escape_m2o_in_o2m(self):
        result = self.import_(['value/m2o'], [['21%']])
        self.assertEqual(
            result['messages'],
            [
                message(
                    "No matching record found for name '21%' in field 'Value/M2O'",
                    moreinfo=moreaction(res_model='export.integer'),
                    field_name='Value',
                    field_path=['value', 'm2o'],
                    field_type='name',
                    value='21%',
                )
            ],
        )


class test_o2m_multiple(ImporterCase):
    model_name = 'export.one2many.multiple'

    def test_multi_mixed(self):
        result = self.import_(
            ['const', 'child1/value', 'child2/value'],
            [
                ['5', '11', '21'],
                ['', '12', '22'],
                ['', '13', '23'],
                ['', '14', ''],
            ],
        )
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 1)

        [b] = self.browse()
        self.assertEqual(set(values(b.child1)), {11, 12, 13, 14})
        self.assertEqual(set(values(b.child2)), {21, 22, 23})

    def test_multi(self):
        result = self.import_(
            ['const', 'child1/value', 'child2/value'],
            [
                ['5', '11', '21'],
                ['', '12', ''],
                ['', '13', ''],
                ['', '14', ''],
                ['', '', '22'],
                ['', '', '23'],
            ],
        )
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 1)

        [b] = self.browse()
        self.assertEqual(set(values(b.child1)), {11, 12, 13, 14})
        self.assertEqual(set(values(b.child2)), {21, 22, 23})

    def test_multi_fullsplit(self):
        result = self.import_(
            ['const', 'child1/value', 'child2/value'],
            [
                ['5', '11', ''],
                ['', '12', ''],
                ['', '13', ''],
                ['', '14', ''],
                ['', '', '21'],
                ['', '', '22'],
                ['', '', '23'],
            ],
        )
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 1)

        [b] = self.browse()
        self.assertEqual(b.const, 5)
        self.assertEqual(set(values(b.child1)), {11, 12, 13, 14})
        self.assertEqual(set(values(b.child2)), {21, 22, 23})


class test_realworld(SavepointCaseWithUserDemo):
    @classmethod
    def setUpClass(cls):
        super().setUpClass()
        cls._load_partners_set()

    def test_bigfile(self):
        data = json.loads(file_open('test_import_export/data/contacts_big.json', 'rt').read())
        result = self.env['res.partner'].load(['name', 'phone', 'email', 'image_1920'], data)
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), len(data))

    def test_backlink(self):
        fnames = ["name", "type", "street", "city", "country_id", "category_id", "is_company", "parent_id"]
        data = json.loads(file_open('test_import_export/data/contacts.json', 'rt').read())
        result = self.env['res.partner'].load(fnames, data)
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), len(data))

    def test_recursive_o2m(self):
        """The content of the o2m field's dict needs to go through conversion
        as it may be composed of convertables or other relational fields
        """
        self.env.registry.clear_cache()
        Model = self.env['export.one2many.recursive']
        result = Model.load(
            ['value', 'child/const', 'child/child1/str', 'child/child2/value'],
            [
                ['4', '42', 'foo', '55'],
                ['', '43', 'bar', '56'],
                ['', '', 'baz', ''],
                ['', '55', 'qux', '57'],
                ['5', '99', 'wheee', ''],
                ['', '98', '', '12'],
            ],
        )

        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 2)

        b = Model.browse(result['ids'])
        self.assertEqual((b[0].value, b[1].value), (4, 5))

        self.assertEqual([child.str for child in b[0].child.sorted()[1].child1], ['bar', 'baz'])
        self.assertFalse(len(b[1].child.sorted()[1].child1))
        self.assertEqual([child.value for child in b[1].child.sorted()[1].child2], [12])

    def test_o2m_subfields_fail_by_implicit_id(self):
        self.env.registry.clear_cache()
        Model = self.env['export.one2many.recursive']
        result = Model.with_context(import_file=True).load(
            ['child/child1/parent_id'],
            [['5']],
        )
        self.assertEqual(
            result['messages'],
            [
                message(
                    "No matching record found for name '5' in field 'Child/Child1/Parent'",
                    field='child',
                    moreinfo=moreaction(res_model='export.one2many.multiple'),
                    field_name='Child',
                    field_path=['child', 'child1', 'parent_id'],
                    field_type='name',
                    value='5',
                )
            ],
        )
        self.assertIs(result['ids'], False)

    def test_no_install_mode(self):
        """Test that the data is imported without the `install_mode` context key"""
        self.env.registry.clear_cache()
        Model = self.env['export.with.non.demo.constraint']
        result = Model.with_context(import_file=True).load(
            ['name'],
            [['test']],
        )
        self.assertEqual(len(result['messages']), 1)
        self.assertEqual(result['messages'][0]['message'], "Name must start with an uppercase letter")
        self.assertIs(result['ids'], False)


class test_date(ImporterCase):
    model_name = 'export.date'

    def test_empty(self):
        self.assertEqual(self.import_(['value'], []), {'ids': [], 'messages': [], 'nextrow': False})

    def test_basic(self):
        result = self.import_(['value'], [['2012-02-03']])
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 1)

    def test_invalid(self):
        result = self.import_(['value'], [['not really a date']])
        self.assertEqual(
            result['messages'], [message("'not really a date' does not seem to be a valid date for field 'Value'", moreinfo="Use the format '2012-12-31'", field_name='Value', field_path=['value'])]
        )
        self.assertIs(result['ids'], False)


class test_datetime(ImporterCase):
    model_name = 'export.datetime'

    def test_empty(self):
        self.assertEqual(self.import_(['value'], []), {'ids': [], 'messages': [], 'nextrow': False})

    def test_basic(self):
        result = self.import_(['value'], [['2012-02-03 11:11:11']])
        self.assertFalse(result['messages'])
        self.assertEqual(len(result['ids']), 1)

    def test_invalid(self):
        result = self.import_(['value'], [['not really a datetime']])
        self.assertEqual(
            result['messages'],
            [message("'not really a datetime' does not seem to be a valid datetime for field 'Value'", moreinfo="Use the format '2012-12-31 23:59:59'", field_name='Value', field_path=['value'])],
        )
        self.assertIs(result['ids'], False)

    def test_checktz1(self):
        """Imported date should be interpreted as being in the tz provided by
        the context
        """
        # write dummy tz in user (Asia/Hovd UTC+0700), should be superseded by
        # context
        self.env.user.write({'tz': 'Asia/Hovd'})

        # UTC+1400
        result = self.import_(['value'], [['2012-02-03 11:11:11']], {'tz': 'Pacific/Kiritimati'})
        self.assertFalse(result['messages'])
        self.assertEqual([fields.Datetime.to_string(value['value']) for value in self.read(domain=[('id', 'in', result['ids'])])], ['2012-02-02 21:11:11'])

        # UTC-0930
        result = self.import_(['value'], [['2012-02-03 11:11:11']], {'tz': 'Pacific/Marquesas'})
        self.assertFalse(result['messages'])
        self.assertEqual([fields.Datetime.to_string(value['value']) for value in self.read(domain=[('id', 'in', result['ids'])])], ['2012-02-03 20:41:11'])

    def test_usertz(self):
        """If the context does not hold a timezone, the importing user's tz
        should be used
        """
        # UTC +1000
        self.env.user.write({'tz': 'Asia/Yakutsk'})

        result = self.import_(['value'], [['2012-02-03 11:11:11']])
        self.assertFalse(result['messages'])
        self.assertEqual([fields.Datetime.to_string(value['value']) for value in self.read(domain=[('id', 'in', result['ids'])])], ['2012-02-03 01:11:11'])

    def test_notz(self):
        """If there is no tz either in the context or on the user, falls back
        to UTC
        """
        self.env.user.write({'tz': False})

        result = self.import_(['value'], [['2012-02-03 11:11:11']])
        self.assertFalse(result['messages'])
        self.assertEqual([fields.Datetime.to_string(value['value']) for value in self.read(domain=[('id', 'in', result['ids'])])], ['2012-02-03 11:11:11'])


class test_unique(ImporterCase):
    model_name = 'export.unique'

    @mute_logger('odoo.sql_db')
    def test_unique(self):
        result = self.import_(
            ['value'],
            [
                ['1'],
                ['1'],
                ['2'],
                ['3'],
                ['3'],
            ],
        )
        self.assertFalse(result['ids'])
        messages = result['messages']
        messages_messages = [m.pop('message') for m in messages]
        expected = [
            dict(message="The value for 'value' (Value) already exists.",
                 type='error', rows={'from': 1, 'to': 1},
                 record=1, field='value'),
            dict(message="The value for 'value' (Value) already exists.",
                 type='error', rows={'from': 4, 'to': 4},
                 record=4, field='value'),
        ]
        expected_messages = [m.pop('message') for m in expected]
        for actual, expect in zip(messages_messages, expected_messages):
            self.assertIn(expect, actual)
        self.assertEqual(messages, expected)

    @mute_logger('odoo.sql_db')
    def test_unique_pair(self):
        result = self.import_(
            ['value2', 'value3'],
            [
                ['0', '1'],
                ['1', '0'],
                ['1', '1'],
                ['1', '1'],
            ],
        )
        self.assertFalse(result['ids'])
        self.assertEqual(len(result['messages']), 1)
        message = result['messages'][0]
        self.assertEqual(message['type'], 'error')
        self.assertEqual(message['record'], 3)
        self.assertEqual(message['rows'], {'from': 3, 'to': 3})
        self.assertIn("The value for 'value2, value3' (Value2 and Value3) already exists.", message['message'])

    @mute_logger('odoo.sql_db')
    def test_unique_update_record(self):
        existing_records = self.env[self.model_name].create([{'value': 1}, {'value': 2}])
        result = self.import_(
            ['.id', 'value'],
            [
                [str(existing_records[1].id), '1'],
            ],
        )
        self.assertFalse(result['ids'])
        self.assertEqual(len(result['messages']), 1)
        message = result['messages'][0]
        self.assertEqual(message['type'], 'error')
        self.assertEqual(message['record'], 0)
        self.assertEqual(message['rows'], {'from': 0, 'to': 0})
        self.assertIn("The value for 'value' (Value) already exists.", message['message'])


class test_inherits(ImporterCase):
    """The import process should only assign a new xid (derived from the
    childs') if the child is being created and triggers the creation of the
    parent
    """

    model_name = 'export.inherits.child'

    def test_create_no_parent(self):
        r = self.import_(
            ['id', 'value_parent', 'value'],
            [
                ['xxx.child', '0', '1'],
            ],
        )
        rec = self.env[self.model_name].browse(r['ids'])

        self.assertEqual(rec.value_parent, 0)
        self.assertEqual(rec.value, 1)
        self.assertEqual(rec.parent_id.value_parent, 0)
        self.assertEqual(
            rec._get_external_ids()[rec.id],
            ['xxx.child'],
        )
        self.assertEqual(
            rec.parent_id._get_external_ids()[rec.parent_id.id],
            ['xxx.child_export_inherits_parent'],
        )

    def test_create_parent_no_xid(self):
        parent = self.env['export.inherits.parent'].create({'value_parent': 0})
        r = self.import_(
            ['id', 'parent_id/.id', 'value'],
            [
                ['xxx.child', str(parent.id), '1'],
            ],
        )
        rec = self.env[self.model_name].browse(r['ids'])
        self.assertEqual(rec.value_parent, 0)
        self.assertEqual(rec.parent_id, parent)

        self.assertEqual(
            rec._get_external_ids()[rec.id],
            ['xxx.child'],
        )
        self.assertEqual(
            rec.parent_id._get_external_ids()[rec.parent_id.id],
            [],
            "no xid should be created for the parent",
        )

    def test_create_parent_with_xid(self):
        parent = self.env['export.inherits.parent'].create({'value_parent': 0})
        pid = self.env['ir.model.data'].create(
            {
                'model': 'export.inherits.parent',
                'res_id': parent.id,
                'module': 'xxx',
                'name': 'parent',
            }
        )
        r = self.import_(
            ['id', 'parent_id/.id', 'value'],
            [
                ['xxx.child', str(parent.id), '1'],
            ],
        )
        rec = self.env[self.model_name].browse(r['ids'])
        self.assertEqual(rec.value_parent, 0)
        self.assertEqual(rec.parent_id, parent)
        self.assertTrue(pid.exists().res_id, parent.id)

        self.assertEqual(
            rec._get_external_ids()[rec.id],
            ['xxx.child'],
        )
        self.assertEqual(
            rec.parent_id._get_external_ids()[rec.parent_id.id],
            ['xxx.parent'],
        )

    def test_create_parent_by_xid(self):
        parent = self.env['export.inherits.parent'].create({'value_parent': 0})
        pid = self.env['ir.model.data'].create(
            {
                'model': 'export.inherits.parent',
                'res_id': parent.id,
                'module': 'xxx',
                'name': 'parent',
            }
        )
        r = self.import_(
            ['id', 'parent_id/id', 'value'],
            [
                ['xxx.child', 'xxx.parent', '1'],
            ],
        )
        rec = self.env[self.model_name].browse(r['ids'])
        self.assertEqual(rec.value_parent, 0)
        self.assertEqual(rec.parent_id, parent)
        self.assertTrue(pid.exists().res_id, parent.id)

        self.assertEqual(
            rec._get_external_ids()[rec.id],
            ['xxx.child'],
        )
        self.assertEqual(
            rec.parent_id._get_external_ids()[rec.parent_id.id],
            ['xxx.parent'],
        )

    def test_update_parent_no_xid(self):
        parent = self.env['export.inherits.parent'].create({'value_parent': 0})
        child = self.env[self.model_name].create(
            {
                'parent_id': parent.id,
                'value': 1,
            }
        )
        self.env['ir.model.data'].create(
            {
                'model': self.model_name,
                'res_id': child.id,
                'module': 'xxx',
                'name': 'child',
            }
        )

        self.import_(
            ['id', 'value'],
            [
                ['xxx.child', '42'],
            ],
        )
        self.assertEqual(child.value, 42)
        self.assertEqual(child.parent_id, parent)

        self.assertEqual(
            child._get_external_ids()[child.id],
            ['xxx.child'],
        )
        self.assertEqual(
            parent._get_external_ids()[parent.id],
            [],
        )

    def test_update_parent_with_xid(self):
        parent = self.env['export.inherits.parent'].create({'value_parent': 0})
        child = self.env[self.model_name].create(
            {
                'parent_id': parent.id,
                'value': 1,
            }
        )
        pid, cid = self.env['ir.model.data'].create(
            [
                {
                    'model': 'export.inherits.parent',
                    'res_id': parent.id,
                    'module': 'xxx',
                    'name': 'parent',
                },
                {
                    'model': self.model_name,
                    'res_id': child.id,
                    'module': 'xxx',
                    'name': 'child',
                },
            ]
        )

        self.import_(
            ['id', 'value'],
            [
                ['xxx.child', '42'],
            ],
        )
        self.assertEqual(child.value, 42)
        self.assertEqual(child.parent_id, parent)
        self.assertEqual(pid.exists().res_id, parent.id)
        self.assertEqual(cid.exists().res_id, child.id)

        self.assertEqual(
            child._get_external_ids()[child.id],
            ['xxx.child'],
        )
        self.assertEqual(
            parent._get_external_ids()[parent.id],
            ['xxx.parent'],
        )


class CheckSavepoint(ImporterCase):
    model_name = 'export.unique'

    @mute_logger("odoo.sql_db")
    def test_max_savepoint(self):
        from odoo.sql_db import Cursor  # noqa: PLC0415
        with (
            patch.object(Cursor, 'savepoint', autospec=True, side_effect=Cursor.savepoint) as sp_mock,
            patch.object(Cursor, 'flush', autospec=True, side_effect=Cursor.flush) as sp_flush,
        ):
            data = [[str(i)] for i in range(100)] + [[str(i)] for i in range(20)]
            self.import_(['value'], data)
            self.assertLess(sp_mock.call_count, 100, "Too many savepoints in the same transaction, load method should not create a savepoint for each record")
            self.assertEqual(sp_mock.call_count, 2, "Too many savepoints in the same transaction")
            self.assertGreater(sp_flush.call_count, 120, "We should flush after each record to bind errors to the record")
